跳到主要内容

Spark 快速开始

配置

要在Spark中使用LakeSoul,请首先配置Spark Catalog。LakeSoul使用Apache Spark的DataSourceV2 API来实现数据源和目录。此外,LakeSoul还提供了 Scala 的表API,以扩展LakeSoul数据表的功能。

Spark 3 Support Matrix

LakeSoulSpark Version
2.2.x-2.4.x3.3.x
2.0.x-2.1.x3.1.x

Spark Shell/SQL

使用LakeSoulSparkSessionExtension sql扩展来运行spark-shell/spark-sql。

spark-shell --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul  --jars lakesoul-spark-2.5.0-spark-3.3.jar

Maven 项目依赖配置

<dependency>
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul</artifactId>
<version>2.5.0-spark-3.3</version>
</dependency>
// Scala
import org.apache.spark.sql.SparkSession
import spark.implicits._
import com.dmetasoul.lakesoul.tables.LakeSoulTable

val spark = SparkSession.builder()
.master("local")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
.config("spark.sql.defaultCatalog", "lakesoul")
.getOrCreate()

创建命名空间

首先,为LakeSoul表创建一个namespace,如果不创建将使用默认的namespace,LakeSoul Catalog的默认namespace是default

// Spark SQL
CREATE NAMESPACE lakesoul_namespace;
USE lakesoul_namespace

创建表

使用USING lakesoul的子句创建一个分区的LakeSoul表

// Spark SQL
CREATE TABLE lakesoul_table (id BIGINT, date STRING, data STRING)
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table'

主键表

在LakeSoul中,带有主键的表被定义为哈希分区表。使用USING lakesoul子句,并结合TBLPROPERTIES设置(其中'hashPartitions'指定以逗号分隔的主键列表,'hashBucketNum'指定哈希桶的大小),可以创建一个哈希分区的LakeSoul表。

// Spark SQL
CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, date STRING, name STRING)
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table'
TBLPROPERTIES (
'hashPartitions'='id',
'hashBucketNum'='2')

主键 CDC 表

哈希分区的LakeSoul表具有可选的数据变更捕获(CDC)功能,能够记录数据的变化。要创建支持CDC的LakeSoul表,可以在哈希分区表的DDL语句中添加额外的TBLPROPERTIES设置,指定'lakesoul_cdc_change_column'属性。这个属性定义了一个隐式列,帮助表有效地处理CDC信息,从而实现对数据变更的精确追踪和管理。

// Spark SQL
CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, date STRING, name STRING)
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table'
TBLPROPERTIES(
'hashPartitions'='id',
'hashBucketNum'='2',
'lakesoul_cdc_change_column' = 'op'
)

数据插入/合并

要使用Spark SQL向非哈希分区表写入数据,请使用INSERT INTO语句。

要使用DataFrame向表写入数据,请使用DataFrameWriterV2 API。如果这是对该表的第一次写入,它还将自动创建相应的LakeSoul表。

// Spark SQL
INSERT INTO TABLE lakesoul_table VALUES (1, '2024-01-01', 'Alice'), (2, '2024-01-01', 'Bob'), (1, "2024-01-02", "Cathy")

要使用Spark SQL向哈希分区表写入数据,请使用Merge INTO语句。

要使用DataFrame向哈希分区表写入数据,请使用LakeSoulTableupsert API。

// Spark SQL
// Create source_view
CREATE OR REPLACE VIEW spark_catalog.default.source_view (id , date, data)
AS SELECT (1 as `id`, '2024-01-01' as `date`, 'data' as `data`)

// Merge source_view Into lakesoul_hash_table

MERGE INTO lakesoul_hash_table AS t
USING spark_catalog.default.source_view AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

数据更新

LakeSoul表可以通过DataFrame或使用标准的UPDATE语句进行更新。要使用DataFrame更新表中的数据,请使用LakeSoulTableupdateExpr API。

// Spark SQL
UPDATE table_namespace.table_name SET name = "David" WHERE id = 2

数据删除

LakeSoul表可以通过DataFrame或使用标准的DELETE语句来删除记录。要使用DataFrame从表中删除数据,请使用LakeSoulTabledelete API。

// Spark SQL
DELETE FROM lakesoul_table WHERE id =1

数据查询

LakeSoul表可以使用DataFrame或Spark SQL进行查询。

// Spark SQL
SELECT * FROM lakesoul_table

Time Travel查询

LakeSoul支持Time Travel查询,可以查询历史上任何时间点的表或两个提交时间之间的更改数据。

// Scala
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_cdc_table"
Seq(("range1", "hash1", "insert"), ("range2", "hash2", "insert"), ("range3", "hash2", "insert"), ("range4", "hash2", "insert"), ("range4", "hash4", "insert"), ("range3", "hash3", "insert"))
.toDF("range", "hash", "op")
.write
.mode("append")
.format("lakesoul")
.option("rangePartitions", "range")
.option("hashPartitions", "hash")
.option("hashBucketNum", "2")
.option("shortTableName", "lakesoul_cdc_table")
.option("lakesoul_cdc_change_column", "op")
.save(tablePath)
// record the version of 1st commit
val versionA: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)


val lakeTable = LakeSoulTable.forPath(tablePath)
lakeTable.upsert(Seq(("range1", "hash1-1", "delete"), ("range2", "hash2-10", "delete"))
.toDF("range", "hash", "op"))
// record the version of 2nd commit
val versionB: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)

lakeTable.upsert(Seq(("range1", "hash1-13", "insert"), ("range2", "hash2-13", "update"))
.toDF("range", "hash", "op"))
lakeTable.upsert(Seq(("range1", "hash1-15", "insert"), ("range2", "hash2-15", "update"))
.toDF("range", "hash", "op"))
// record the version of 3rd,4th commits
val versionC: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)

全量查询

// Scala
spark.sql("SELECT * FROM lakesoul_cdc_table")

快照查询

LakeSoul支持快照查询,可用于查询历史上某一时间点的表数据。

// Scala
spark.read.format("lakesoul")
.option(LakeSoulOptions.PARTITION_DESC, "range=range2")
.option(LakeSoulOptions.READ_END_TIME, versionB)
.option(LakeSoulOptions.READ_TYPE, ReadType.SNAPSHOT_READ)
.load(tablePath)

增量查询

LakeSoul支持增量查询,可获得在起始时间和结束时间之间发生更改的数据记录。

// Scala
spark.read.format("lakesoul")
.option(LakeSoulOptions.PARTITION_DESC, "range=range1")
.option(LakeSoulOptions.READ_START_TIME, versionA)
.option(LakeSoulOptions.READ_END_TIME, versionB)
.option(LakeSoulOptions.READ_TYPE, ReadType.INCREMENTAL_READ)
.load(tablePath)

更多案例

接下来,您可以在Spark API文档中了解更多关于在Spark中使用LakeSoul表的案例。